本文为您介绍Avro格式的使用示例、配置选项和类型映射。
背景信息
Avro格式允许基于Avro的结构读写Avro数据。当前,Avro结构是基于表结构推导而来的。支持Avro格式的连接器包括消息队列Kafka、Upsert Kafka和对象存储OSS。
使用示例
利用Kafka以及Avro格式构建表的示例如下。
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'avro'
)
配置选项
参数 | 是否必选 | 默认值 | 类型 | 描述 |
format | 是 | (none) | String | 声明使用的格式。使用Avro格式时,参数取值为avro。 |
avro.codec | 否 | (none) | String | 指定Avro压缩的编解码器,仅适用于连接器为Filesystem的情况。参数取值如下:
|
类型映射
Flink与Avro的数据类型的映射关系如下。
Flink SQL类型 | Avro类型 |
CHAR / VARCHAR / STRING | string |
BOOLEAN | boolean |
BINARY / VARBINARY | bytes |
DECIMAL | fixed 说明 带有精度的十进制数。 |
TINYINT | int |
SMALLINT | int |
INT | int |
BIGINT | long |
FLOAT | float |
DOUBLE | double |
DATE | int 说明 日期。 |
TIME | int 说明 以毫秒为单位的时间。 |
TIMESTAMP | long 说明 以毫秒为单位的时间戳。 |
ARRAY | array |
MAP 说明 元素必须是STRING、CHAR或VARCHAR类型。 | map |
MULTISET 说明 元素必须是STRING、CHAR或VARCHAR类型。 | map |
ROW | record |
除了以上类型,Flink支持读取和写入nullable的类型。Flink将nullable的类型映射到Avro union(something, null),其中something是从Flink类型转换的Avro类型。
说明
关于Avro类型的信息,详情请见Avro 规范。
文档内容是否对您有帮助?